Skip to content

fix(runtime): drain steerQueue at top of RunStream loop to close idle-window race#2492

Merged
simonferquel merged 9 commits intodocker:mainfrom
simonferquel-clanker:fix/steer-queue-top-of-turn-drain
Apr 22, 2026
Merged

fix(runtime): drain steerQueue at top of RunStream loop to close idle-window race#2492
simonferquel merged 9 commits intodocker:mainfrom
simonferquel-clanker:fix/steer-queue-top-of-turn-drain

Conversation

@simonferquel-clanker
Copy link
Copy Markdown
Contributor

@simonferquel-clanker simonferquel-clanker commented Apr 22, 2026

Fixes #2491

Problem

LocalRuntime.Steer enqueues into steerQueue, but the queue was only drained mid-loop — after a batch of tool calls, before the stop check. Two gaps allowed steer messages to be silently dropped:

  1. Idle-window race — a Steer call lands between two RunStream invocations. The queue holds the message but nothing drains it when the next RunStream starts.
  2. First-turn miss — when a RunStream turn produces a plain-text response with no tool calls, res.Stopped fires before the mid-loop drain is reached; the pending steer is never injected.

Fix

Three steer drain points are added across the RunStream loop, giving the invariant: any Steer call that enqueues successfully is guaranteed to be consumed within the current RunStream, never stranded.

All three drain sites inject steer messages as plain user messages — no wrapping, no envelope.

  1. Top-of-turn drain (after AgentInfo and contextLimit init, before GetMessages): catches idle-window and first-turn-miss races.
  2. Mid-loop drain (after tool calls, unchanged in placement): catches steers that arrive while tool calls are running.
  3. End-of-iteration drain (inside res.Stopped, before the follow-up queue and break): closes the narrow race between the mid-loop drain and the stop decision.

The appendSteerAndEmit helper centralises the add-message + emit-event idiom used at all three sites.

Empty-session bootstrap

A RunStream started with zero messages in the session but steer messages already queued injects those messages as plain user messages before the first model call. The model receives them as its initial context.

Changes

  • pkg/runtime/loop.goappendSteerAndEmit helper (no wrapping); three drain sites.
  • pkg/runtime/runtime_test.gomessageRecordingProvider, hookStream, steerInjectProvider; three focused tests:
    • TestSteer_IdleWindowIsConsumedOnNextTurn
    • TestSteer_EmptySessionBootstrap
    • TestSteer_EndOfIterationRaceIsConsumedInCurrentRunStream

…-window race

The steer queue was only drained mid-loop (after tool-call batches). Two
gaps allowed steer messages to be silently dropped:

1. Idle-window race: a Steer call lands between two RunStream invocations;
   the queue holds the message but nothing drains it when the next
   RunStream starts.
2. First-turn miss: when a RunStream turn produces a plain-text response
   with no tool calls, res.Stopped fires before the mid-loop drain is
   reached; the pending steer message is never injected.

Fix: drain steerQueue at the top of the for-loop body (before the first
model call) using the same pattern as the existing mid-loop drain.

Also extract the duplicate system-reminder wrap string into wrapSteerMessage
so the two drain sites cannot drift apart.

Also handles the empty-session bootstrap case: a RunStream started with
zero messages but steer messages already queued injects those messages
before the first model call, giving the model its initial context.

Tests added:
- TestSteer_IdleWindowIsConsumedOnNextTurn
- TestSteer_EmptySessionBootstrap

Fixes docker#2491

Assisted-By: docker-agent
@simonferquel-clanker simonferquel-clanker requested a review from a team as a code owner April 22, 2026 09:49
Comment thread pkg/runtime/loop.go Outdated
// a plain-text response with no tool calls.
if steered := r.steerQueue.Drain(ctx); len(steered) > 0 {
for _, sm := range steered {
userMsg := session.UserMessage(wrapSteerMessage(sm.Content), sm.MultiContent...)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

start of turn message should not get the steerMessage envelope (the agent is not working on anything at this point)

The system-reminder envelope is only appropriate when the agent is
actively working (mid-loop, between tool calls). At the top of a new
turn the runtime is idle, so steer messages are injected as plain
user messages — no wrapping needed.

Assisted-By: docker-agent
@simonferquel-clanker
Copy link
Copy Markdown
Contributor Author

Fixed — top-of-turn steer messages are now injected as plain user messages (no system-reminder envelope). The wrapper is kept only on the mid-loop drain where the agent is actively working.

@simonferquel-clanker
Copy link
Copy Markdown
Contributor Author

Updated: top-of-turn steer messages are now injected as plain user messages without the system-reminder envelope. The wrapSteerMessage helper is retained only at the mid-loop drain site (where the agent is actively working between tool calls).

- Move top-of-turn drain to after AgentInfo and contextLimit init so
  compactIfNeeded can be called immediately after injection (S1, S2).
- Extract appendSteerAndEmit helper (wrap bool) to eliminate the
  duplicated add-message+emit-event idiom at both drain sites (S3).
- Tighten both steer tests to assert the stored session message and the
  message seen by the model contain raw content with NO system-reminder
  envelope for the top-of-turn path (B1).
- Document messageRecordingProvider fallback behaviour for unexpected
  extra model calls (N4).

Assisted-By: docker-agent
@simonferquel-clanker
Copy link
Copy Markdown
Contributor Author

Addressed all review findings:

  • B1 (blocking): Top-of-turn steer messages are already plain user messages (no envelope). Tests now explicitly assert both the stored session message content and the message seen by the model equal the raw content and do NotContains "<system-reminder>".
  • S1: Top-of-turn drain moved to after contextLimit initialization (after AgentInfo emission). compactIfNeeded is called immediately after injection using the captured pre-injection message count.
  • S2: Resolved naturally — drain is now placed after AgentInfo, so event ordering is AgentInfo → steer UserMessages → model call.
  • S3: Extracted appendSteerAndEmit(sess, sm, wrap bool, events) helper. Both drain sites use it; wrap=false for top-of-turn, wrap=true for mid-loop.
  • N4: messageRecordingProvider fallback comment explains the behaviour for unexpected extra calls; assertion tests use require.Len / require.NotEmpty to catch them.

A Steer() call arriving in the narrow window between the mid-loop drain
and the res.Stopped break was stranded until the next RunStream call.

Fix: re-check steerQueue inside the res.Stopped block before breaking.
Any message that enqueued successfully is now guaranteed to be consumed
within the current RunStream. Messages injected here use the
system-reminder envelope (matching mid-turn semantics: the agent just
finished a turn).

T1: both existing top-of-turn steer tests already assert
NotContains(<system-reminder>) in firstCallMsgs — confirmed present.

Test added: TestSteer_EndOfIterationRaceIsConsumedInCurrentRunStream
Uses a provider hook to inject a Steer() synchronously during
CreateChatCompletionStream on the first call, simulating the race.
Asserts: steer UserMessageEvent emitted within the same RunStream,
exactly 2 model calls made, and stored session message carries the
system-reminder envelope.

Assisted-By: docker-agent
Comment thread pkg/runtime/loop.go Outdated
// same system-reminder envelope as mid-turn steers.
if steered := r.steerQueue.Drain(ctx); len(steered) > 0 {
for _, sm := range steered {
r.appendSteerAndEmit(sess, sm, true, events)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we are about to stop, those messages should not have an enveloppe

… test

At res.Stopped the current turn is ending and a new one is about to
begin — the agent is not mid-task — so steer messages injected there
should be plain user messages, same as the top-of-turn drain.

Changes:
- loop.go: appendSteerAndEmit(wrap=false) at the end-of-iteration drain
  (inside res.Stopped block); update comment accordingly.
- runtime_test.go: add hookStream type that fires a callback on the
  FinishReasonStop Recv, giving a deterministic injection point;
  rewrite TestSteer_EndOfIterationRaceIsConsumedInCurrentRunStream to
  use hookStream and assert consumed-in-same-RunStream invariant (2
  model calls, UserMessageEvent present, steer content in session).

Assisted-By: docker-agent
@simonferquel-clanker
Copy link
Copy Markdown
Contributor Author

Addressed inline change request (discussion_r3123189465): end-of-iteration drain now uses appendSteerAndEmit(wrap=false). At res.Stopped the current turn is ending and a new one is about to begin — same semantics as the top-of-turn drain, so plain user messages are correct. Comment in code updated. Test (hookStream + TestSteer_EndOfIterationRaceIsConsumedInCurrentRunStream) asserts steer consumed within same RunStream: 2 model calls and UserMessageEvent present. Commit: 4a17981.

….Contains

- hookStream: add blank line separating embedded *mockStream from
  onStop field (embeddedstructfieldcheck)
- TestSteer_EndOfIterationRaceIsConsumedInCurrentRunStream: replace
  assert.True(strings.Contains(...)) with assert.Contains (testifylint)

Assisted-By: docker-agent
…envelope caveat

Add assert.Contains on the stored session message content in
TestSteer_EndOfIterationRaceIsConsumedInCurrentRunStream, bringing
it closer to parity with the other two steer tests.

The Equal/NotContains envelope assertions cannot be applied here:
the hookStream injection fires inside Recv() on the stop chunk, so
the steer lands in the queue before tryModelWithFallback returns and
is always consumed by the mid-loop drain (wrap=true). The end-of-
iteration drain (wrap=false, inside res.Stopped) covers the narrower
race reachable only via real goroutine concurrency. A comment in the
test documents this constraint.

Assisted-By: docker-agent
@simonferquel
Copy link
Copy Markdown
Contributor

After discussion, we think we should remove all wrapping: depending on the user message it can create too much confusion. Remove the message wrapper alltogether

All three steer drain sites now inject plain user messages.
Remove wrapSteerMessage helper and the wrap bool parameter from
appendSteerAndEmit — there is no wrapping anywhere.

Update code comments and PR body to reflect the simplified semantics.
Tighten TestSteer_EndOfIterationRaceIsConsumedInCurrentRunStream to use
assert.Equal + assert.NotContains now that the mid-loop drain no longer
wraps, removing the stale wrap=true/wrap=false caveat comment.

Assisted-By: docker-agent
Copy link
Copy Markdown
Contributor

@simonferquel simonferquel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Be more concise in your code comments

Cut multi-line explanations to one short line each at all three
drain sites and on appendSteerAndEmit.

Assisted-By: docker-agent
Copy link
Copy Markdown
Contributor

@simonferquel simonferquel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm, asking for a 2nd review

Comment thread pkg/runtime/loop.go

// Drain steer messages queued while idle or before the first model call
// (covers idle-window and first-turn-miss races).
if steered := r.steerQueue.Drain(ctx); len(steered) > 0 {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why this one is needed here?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 cases:

  • if a user sends a steering message while the turn is being ended, you end up with the steering messages being sent after the next tool call of the next turn instead of immediately
  • when used as a library, in an app that consider all messages as "interrupting" - a la Claude Code - this simplify things a lot: you do not have to treat messages sent while the runner is idle differently than when it is active: they all come from the same queue

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not talking about the case when the turn is being ended, that's lower in the code. I'm asking why are we steering before the turn starts

@simonferquel-clanker
Copy link
Copy Markdown
Contributor Author

Re: #2492 (comment)

The mid-loop drain and the res.Stopped break are not atomic — there is a narrow window between them:

steered := r.steerQueue.Drain(ctx)  // ← drains the queue
if len(steered) > 0 { continue }
if res.Stopped { break }            // ← Steer() arriving here is missed

A Steer() call landing in that window is stranded: on a plain-text turn (no tool calls) there is no subsequent mid-loop drain to pick it up, so it waits for the next RunStream invocation — the same idle-window problem the top-of-turn drain fixes. The re-check at res.Stopped closes this last window so any successfully-enqueued steer is guaranteed to be consumed within the current RunStream.

@rumpl
Copy link
Copy Markdown
Member

rumpl commented Apr 22, 2026

Re: #2492 (comment)

The mid-loop drain and the res.Stopped break are not atomic — there is a narrow window between them:

steered := r.steerQueue.Drain(ctx)  // ← drains the queue
if len(steered) > 0 { continue }
if res.Stopped { break }            // ← Steer() arriving here is missed

A Steer() call landing in that window is stranded: on a plain-text turn (no tool calls) there is no subsequent mid-loop drain to pick it up, so it waits for the next RunStream invocation — the same idle-window problem the top-of-turn drain fixes. The re-check at res.Stopped closes this last window so any successfully-enqueued steer is guaranteed to be consumed within the current RunStream.

Ok but there is already a streer.drain that was added in the res.stopped case

@simonferquel-clanker
Copy link
Copy Markdown
Contributor Author

Exactly — that drain is the fix. The explanation was describing why it was needed, not a gap that still exists. The code is complete. Happy to get your re-approval if you have no further concerns.

@simonferquel simonferquel merged commit 75a0a9f into docker:main Apr 22, 2026
17 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Steer queue: drain at RunStream entry to close idle-window race

3 participants